-
Notifications
You must be signed in to change notification settings - Fork 919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Row-group-level partitioning for Parquet #9849
Conversation
cc @rjzamora |
Yes. in fact that exactly how I did it in this partitioning PR #9810 cudf/cpp/src/io/parquet/chunk_dict.cu Line 303 in 200d1b0
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice feature @calebwin! I have some comments below. I'm not deeply familiar with this code so let me know if anything I suggested seems off base.
auto start_row = 0; | ||
for (auto i = 0; i < block_x; i++) { | ||
start_row += fragments[0][i].num_rows; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps these offsets should be pre-computed with a scan and then passed into the kernel? I'm not sure how many row groups we expect. The difference between 10 and 1M would indicate whether this should be a host or device computation.
If we shouldn't use a scan and pass in the precomputed offsets, then this could use std::accumulate
. Might look something like this snippet (untested):
auto start_row = 0; | |
for (auto i = 0; i < block_x; i++) { | |
start_row += fragments[0][i].num_rows; | |
} | |
auto row_counter = thrust::transform_iterator(fragments[0], [] __device__(auto const& page){ return page.num_rows; }); | |
auto start_row = std::accumulate(row_counter[0], row_counter[block_x], 0); |
(Note: page
might not be the right name for the function argument, I am just guessing from device_2dspan<PageFragment>
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean like this:
cudf/cpp/src/io/parquet/chunk_dict.cu
Line 107 in f44a50b
size_type start_row = frag.start_row; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My 2cts:
Let's aim to merge this first so Caleb has a chance to get the PR as close as possible to the finish line as possible. If 9810 already addresses some comments here, maybe those pieces can be applied to this PR (also reduces merge conflicts).
I'm not sure what's the best approach, inclined to leave the decision up to @devavret and @calebwin .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @bdice @hyperbolic2346 @devavret @vuule for reviews and comments. I just ran into a subtle CUDA bug in this PR when I was in the middle of writing a benchmark. I looked through #9810 and it looks like there are changes in the CUDA code that may handle edge cases I didn't consider here.
So I'm going to go ahead and try to merge #9810 into this and make appropriate changes. I will see if that resolves the issue I came across when benchmarking. I will then try to address other reviews here.
Should I convert this PR to draft in the meanwhile?
auto start_row = 0; | ||
for (auto i = 0; i < block_x; i++) { | ||
start_row += fragments[0][i].num_rows; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as previous comment.
@@ -20,6 +20,7 @@ | |||
*/ | |||
|
|||
#include <io/statistics/column_statistics.cuh> | |||
#include "io/parquet/parquet_gpu.hpp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably use <>
braces.
#include "io/parquet/parquet_gpu.hpp" | |
#include <io/parquet/parquet_gpu.hpp> |
|
||
#include <iostream> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this was left in from some print debugging? If it is needed, it should go with the other section of stdlib headers like #include <algorithm>
below here, rather than with the rmm includes.
cudf::detail::hostdevice_2dvector<gpu::PageFragment> fragments( | ||
num_columns, num_fragments, stream); | ||
|
||
if (row_group_sizes_specified) { | ||
// auto fragments_span = host_2dspan<gpu::PageFragment>{fragments}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this commented?
write_df.to_parquet( | ||
fil, | ||
index=preserve_index, | ||
row_group_cols=row_group_cols, | ||
**kwargs, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this section of code can be written to only call to_parquet
once. Something roughly like this, which updates **kwargs
and optionally keeps the result:
if return_metadata:
kwargs["metadata_file_path"] = fs.sep.join([subdir, filename])
metadata_result = write_df.to_parquet(
fil,
index=preserve_index,
row_group_cols=row_group_cols,
**kwargs,
)
if return_metadata:
metadata.append(metadata_result)
for a, b in zip(col_names, df.columns): | ||
assert a == b |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this equivalent?
for a, b in zip(col_names, df.columns): | |
assert a == b | |
assert col_names == df.columns |
Aside from being shorter, it is preferable to compare all the column names at once because it produces a nicer error message if it fails.
num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) | ||
|
||
assert num_rows == len(df.index) | ||
assert row_groups == len(row_group_sizes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line verifies the number of row groups, but I don't think this test is checking the number of rows in each row group. That seems important to test here.
Column names by which to partition the dataset across row groups in the | ||
resulting Parquet file | ||
Columns are partitioned in the order they are given |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sentences in docstrings should end in a period. Line breaks should be avoided except to wrap at the column limit.
Column names by which to partition the dataset across row groups in the | |
resulting Parquet file | |
Columns are partitioned in the order they are given | |
Column names by which to partition the dataset across row groups in the | |
resulting Parquet file. Columns are partitioned in the order they are | |
given. |
tmpdir, partition_on=partition_on, row_group_cols=row_group_cols | ||
) | ||
ddf_read = dask_cudf.read_parquet(tmpdir) | ||
assert_eq(len(ddf), len(ddf_read)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_eq
is intended for more complicated assertions about dataframes being equivalent. Comparing lengths should be done with a plain assert
. However, it is probaby a good idea to make sure the data frame written/read is equivalent to the source dataframe in memory:
assert_eq(len(ddf), len(ddf_read)) | |
assert len(ddf) == len(ddf_read) | |
assert_eq(ddf, ddf_read) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. Some comments. Interested to see this progress.
if (fragment_size != -1) { | ||
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows)); | ||
} else { | ||
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (fragment_size != -1) { | |
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows)); | |
} else { | |
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows; | |
} | |
s->frag.num_rows = fragment_size != -1 ? min(fragment_size, max_num_rows - min(start_row, max_num_rows)) : frag[blockIdx.x][blockIdx.y].num_rows; |
if (row_group_sizes_specified) { | ||
num_fragments = 0; | ||
for (std::size_t i = 0; i < row_group_sizes.size(); i++) { | ||
num_fragments += (row_group_sizes[i] + max_page_fragment_size - 1) / max_page_fragment_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a std::accumulate
as Bradley showed above.
This PR has been labeled |
This PR has been labeled |
I'm going to close this since it's fairly out of date and it's not clear if we still want this as is. Feel free to reopen if work on this restarts. |
This PR introduces a
row_group_cols
parameter forcudf.to_parquet
that groups data by the given columns are writes separate columns to separate row groups (row groups are groups of rows within a Parquet file). This is similar topartition_cols
except instead of separate groups being written to separate files, separate groups are written to separate row groups within a file.You should use
row_group_cols
when you want to partition data on a column but there are too many groups (or combinations of groups if you are partitioning on a secondary column) that would result in too many small files.What's left:
Notes to reviewers:
dask_cudf.to_parquet
acceptspartition_on
androw_group_cols
, shouldrow_group_cols
be renamed torow_group_on
for API consistency?populate_chunk_hash_maps
andget_dictionary_indices
kernels correct?row_group_cols
- is this a bug we should fix before merging? It seems to be related to [1] and [2].[1] https://issues.apache.org/jira/browse/ARROW-9136
[2] pandas-dev/pandas#34790.